package com.tpvlog.dfs.datanode.server;

import com.tpvlog.dfs.datanode.client.NameNodeRpcClient;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.*;

import static com.tpvlog.dfs.datanode.config.DataNodeConfig.NIO_PORT;

/**
 * DataNode NIO通信组件
 *
 * @author Ressmix
 */
public class DataNodeNIOServer extends Thread {
    public static final Integer PROCESSOR_NUM = 10;
    public static final Integer IO_THREAD_NUM = 10;

    private Selector selector;
    private List<NioProcessor> processors = new ArrayList<>();
    private NameNodeRpcClient rpcClient;

    public DataNodeNIOServer(NameNodeRpcClient rpcClient) {
        this.rpcClient = rpcClient;
        init();
    }

    @Override
    public void run() {
        while (true) {
            try {
                // 阻塞等待
                selector.select();
                Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
                while (keysIterator.hasNext()) {
                    SelectionKey key = (SelectionKey) keysIterator.next();
                    keysIterator.remove();
                    // 建立连接
                    if (key.isAcceptable()) {
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                        SocketChannel channel = serverSocketChannel.accept();
                        if (channel != null) {
                            // 将建立连接的SocketChannel交给Processor处理
                            channel.configureBlocking(false);
                            Integer processorIndex = new Random().nextInt(PROCESSOR_NUM);
                            NioProcessor processor = processors.get(processorIndex);
                            processor.addChannel(channel);
                        }
                    }
                }
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }
    }

    /*-------------------------------------------------PRIVATE METHOD----------------------------------------------*/

    private void init() {
        ServerSocketChannel serverChannel = null;
        try {
            // 监听OP_ACCEPT事件
            selector = Selector.open();
            serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking(false);
            serverChannel.socket().bind(new InetSocketAddress(NIO_PORT), 100);
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);

            System.out.println("NIOServer已经启动，开始监听端口：" + NIO_PORT);

            // 创建响应队列
            NetworkResponseQueues responseQueues = NetworkResponseQueues.getInstance();
            // 创建Processor线程，每个线程关联一个响应队列
            for (int i = 0; i < PROCESSOR_NUM; i++) {
                NioProcessor processor = new NioProcessor(i);
                processors.add(processor);
                processor.start();
                // 每个Processor线程分配一个响应队列
                responseQueues.assignQueue(i);
            }

            // 创建IO线程
            for (int i = 0; i < IO_THREAD_NUM; i++) {
                new IOThread(rpcClient).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
